package cn.zd.demo.flink.kafka2db.mapfunction;

import cn.zd.demo.flink.kafka2db.dto.KafkaMsgDto;
import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.functions.MapFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonMapFunction<S, K> implements MapFunction<String,KafkaMsgDto> {
private static final Logger logger = LoggerFactory.getLogger(JsonMapFunction.class);
    private String topic;
    public JsonMapFunction(String topic) {
        this.topic = topic;
    }

    @Override
    public KafkaMsgDto map(String val) {
        logger.debug("start json formate:{}",val);
        KafkaMsgDto dto = JSON.parseObject(val, KafkaMsgDto.class);
        dto.setTopic(topic);
        return dto;
    }
}

